import pytest
import unittest
from enum import Enum
import random
import time
import threading
import uuid
import json
import pandas as pd
from datetime import datetime
from prettytable import PrettyTable
import functools
from collections import Counter
from time import sleep
from pymilvus import AnnSearchRequest, RRFRanker, MilvusClient, DataType, CollectionSchema, connections
from pymilvus.milvus_client.index import IndexParams
from pymilvus.bulk_writer import RemoteBulkWriter, BulkFileType
from pymilvus.client.embedding_list import EmbeddingList
from common import common_func as cf
from common import common_type as ct
from common.milvus_sys import MilvusSys
from chaos import constants
from faker import Faker

from common.common_type import CheckTasks
from utils.util_log import test_log as log
from utils.api_request import Error

event_lock = threading.Lock()
request_lock = threading.Lock()


def get_chaos_info():
    try:
        with open(constants.CHAOS_INFO_SAVE_PATH, 'r') as f:
            chaos_info = json.load(f)
    except Exception as e:
        log.warning(f"get_chaos_info error: {e}")
        return None
    return chaos_info


class Singleton(type):
    instances = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls.instances:
            cls.instances[cls] = super().__call__(*args, **kwargs)
        return cls.instances[cls]


class EventRecords(metaclass=Singleton):

    def __init__(self):
        self.file_name = f"/tmp/ci_logs/event_records_{uuid.uuid4()}.parquet"
        self.created_file = False

    def insert(self, event_name, event_status, ts=None):
        log.info(f"insert event: {event_name}, {event_status}")
        insert_ts = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f') if ts is None else ts
        data = {
            "event_name": [event_name],
            "event_status": [event_status],
            "event_ts": [insert_ts]
        }
        df = pd.DataFrame(data)
        if not self.created_file:
            with event_lock:
                df.to_parquet(self.file_name, engine='fastparquet')
                self.created_file = True
        else:
            with event_lock:
                df.to_parquet(self.file_name, engine='fastparquet', append=True)

    def get_records_df(self):
        df = pd.read_parquet(self.file_name)
        return df


class RequestRecords(metaclass=Singleton):

    def __init__(self):
        self.file_name = f"/tmp/ci_logs/request_records_{uuid.uuid4()}.parquet"
        self.buffer = []
        self.created_file = False

    def insert(self, operation_name, collection_name, start_time, time_cost, result):
        data = {
            "operation_name": operation_name,
            "collection_name": collection_name,
            "start_time": start_time,
            "time_cost": time_cost,
            "result": result
        }
        self.buffer.append(data)
        if len(self.buffer) > 100:
            df = pd.DataFrame(self.buffer)
            if not self.created_file:
                with request_lock:
                    df.to_parquet(self.file_name, engine='fastparquet')
                    self.created_file = True
            else:
                with request_lock:
                    df.to_parquet(self.file_name, engine='fastparquet', append=True)
            self.buffer = []

    def sink(self):
        if len(self.buffer) == 0:
            return
        try:
            df = pd.DataFrame(self.buffer)
        except Exception as e:
            log.error(f"convert buffer {self.buffer} to dataframe error: {e}")
            return
        if not self.created_file:
            with request_lock:
                df.to_parquet(self.file_name, engine='fastparquet')
                self.created_file = True
        else:
            with request_lock:
                df.to_parquet(self.file_name, engine='fastparquet', append=True)

    def get_records_df(self):
        self.sink()
        df = pd.read_parquet(self.file_name)
        return df


class ResultAnalyzer:

    def __init__(self):
        rr = RequestRecords()
        df = rr.get_records_df()
        df["start_time"] = pd.to_datetime(df["start_time"])
        df = df.sort_values(by='start_time')
        self.df = df
        self.chaos_info = get_chaos_info()
        self.chaos_start_time = self.chaos_info['create_time'] if self.chaos_info is not None else None
        self.chaos_end_time = self.chaos_info['delete_time'] if self.chaos_info is not None else None
        self.recovery_time = self.chaos_info['recovery_time'] if self.chaos_info is not None else None

    def get_stage_success_rate(self):
        df = self.df
        window = pd.offsets.Milli(1000)

        result = df.groupby([pd.Grouper(key='start_time', freq=window), 'operation_name']).apply(lambda x: pd.Series({
            'success_count': x[x['result'] == 'True'].shape[0],
            'failed_count': x[x['result'] == 'False'].shape[0]
        }))
        data = result.reset_index()
        data['success_rate'] = data['success_count'] / (data['success_count'] + data['failed_count']).replace(0, 1)
        grouped_data = data.groupby('operation_name')
        if self.chaos_info is None:
            chaos_start_time = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f')
            chaos_end_time = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f')
            recovery_time = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f')
        else:
            chaos_start_time = self.chaos_info['create_time']
            chaos_end_time = self.chaos_info['delete_time']
            recovery_time = self.chaos_info['recovery_time']
        stage_success_rate = {}
        for name, group in grouped_data:
            log.info(f"operation_name: {name}")
            # spilt data to 3 parts by chaos start time and chaos end time and aggregate the success rate
            data_before_chaos = group[group['start_time'] < chaos_start_time].agg(
                {'success_rate': 'mean', 'failed_count': 'sum', 'success_count': 'sum'})
            data_during_chaos = group[
                (group['start_time'] >= chaos_start_time) & (group['start_time'] <= chaos_end_time)].agg(
                {'success_rate': 'mean', 'failed_count': 'sum', 'success_count': 'sum'})
            data_after_chaos = group[group['start_time'] > recovery_time].agg(
                {'success_rate': 'mean', 'failed_count': 'sum', 'success_count': 'sum'})
            stage_success_rate[name] = {
                'before_chaos': f"{data_before_chaos['success_rate']}({data_before_chaos['success_count']}/{data_before_chaos['success_count'] + data_before_chaos['failed_count']})" if not data_before_chaos.empty else "no data",
                'during_chaos': f"{data_during_chaos['success_rate']}({data_during_chaos['success_count']}/{data_during_chaos['success_count'] + data_during_chaos['failed_count']})" if not data_during_chaos.empty else "no data",
                'after_chaos': f"{data_after_chaos['success_rate']}({data_after_chaos['success_count']}/{data_after_chaos['success_count'] + data_after_chaos['failed_count']})" if not data_after_chaos.empty else "no data",
            }
        log.info(f"stage_success_rate: {stage_success_rate}")
        return stage_success_rate

    def get_realtime_success_rate(self, interval=10):
        df = self.df
        window = pd.offsets.Second(interval)
        result = df.groupby([pd.Grouper(key='start_time', freq=window), 'operation_name']).apply(lambda x: pd.Series({
            'success_count': x[x['result'] == 'True'].shape[0],
            'failed_count': x[x['result'] == 'False'].shape[0]
        }))
        data = result.reset_index()
        data['success_rate'] = data['success_count'] / (data['success_count'] + data['failed_count']).replace(0, 1)
        grouped_data = data.groupby('operation_name')
        return grouped_data

    def show_result_table(self):
        table = PrettyTable()
        table.field_names = ['operation_name', 'before_chaos',
                             f'during_chaos: {self.chaos_start_time}~{self.recovery_time}',
                             'after_chaos']
        data = self.get_stage_success_rate()
        for operation, values in data.items():
            row = [operation, values['before_chaos'], values['during_chaos'], values['after_chaos']]
            table.add_row(row)
        log.info(f"succ rate for operations in different stage\n{table}")


class Op(Enum):
    create = 'create'  # short name for create collection
    create_db = 'create_db'
    create_collection = 'create_collection'
    create_partition = 'create_partition'
    insert = 'insert'
    insert_freshness = 'insert_freshness'
    upsert = 'upsert'
    upsert_freshness = 'upsert_freshness'
    partial_update = 'partial_update'
    flush = 'flush'
    index = 'index'
    create_index = 'create_index'
    drop_index = 'drop_index'
    load = 'load'
    load_collection = 'load_collection'
    load_partition = 'load_partition'
    release = 'release'
    release_collection = 'release_collection'
    release_partition = 'release_partition'
    search = 'search'
    tensor_search = 'tensor_search'
    full_text_search = 'full_text_search'
    hybrid_search = 'hybrid_search'
    query = 'query'
    text_match = 'text_match'
    phrase_match = 'phrase_match'
    json_query = 'json_query'
    geo_query = 'geo_query'
    delete = 'delete'
    delete_freshness = 'delete_freshness'
    compact = 'compact'
    drop = 'drop'  # short name for drop collection
    drop_db = 'drop_db'
    drop_collection = 'drop_collection'
    drop_partition = 'drop_partition'
    load_balance = 'load_balance'
    bulk_insert = 'bulk_insert'
    alter_collection = 'alter_collection'
    add_field = 'add_field'
    rename_collection = 'rename_collection'
    unknown = 'unknown'


timeout = 120
search_timeout = 10
query_timeout = 10

enable_traceback = False
DEFAULT_FMT = '[start time:{start_time}][time cost:{elapsed:0.8f}s][operation_name:{operation_name}][collection name:{collection_name}] -> {result!r}'

request_records = RequestRecords()


def create_index_params_from_dict(field_name: str, index_param_dict: dict) -> IndexParams:
    """Helper function to convert dict-style index params to IndexParams object"""
    index_params = IndexParams()
    params_copy = index_param_dict.copy()
    index_type = params_copy.pop("index_type", "")
    index_params.add_index(field_name=field_name, index_type=index_type, **params_copy)
    return index_params


def trace(fmt=DEFAULT_FMT, prefix='test', flag=True):
    def decorate(func):
        @functools.wraps(func)
        def inner_wrapper(self, *args, **kwargs):
            start_time = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f')
            start_time_ts = time.time()
            t0 = time.perf_counter()
            res, result = func(self, *args, **kwargs)
            elapsed = time.perf_counter() - t0
            operation_name = func.__name__
            if flag:
                collection_name = self.c_name
                log_str = f"[{prefix}]" + fmt.format(**locals())
                # TODO: add report function in this place, like uploading to influxdb
                try:
                    t0 = time.perf_counter()
                    request_records.insert(operation_name, collection_name, start_time, elapsed, str(result))
                    tt = time.perf_counter() - t0
                    log.debug(f"insert request record cost {tt}s")
                except Exception as e:
                    log.error(e)
                log.debug(log_str)
            if result:
                self.rsp_times.append(elapsed)
                self.average_time = (
                                            elapsed + self.average_time * self._succ) / (self._succ + 1)
                self._succ += 1
                # add first success record if there is no success record before
                if len(self.fail_records) > 0 and self.fail_records[-1][0] == "failure" and \
                        self._succ + self._fail == self.fail_records[-1][1] + 1:
                    self.fail_records.append(("success", self._succ + self._fail, start_time, start_time_ts))
            else:
                self._fail += 1
                self.fail_records.append(("failure", self._succ + self._fail, start_time, start_time_ts))
            return res, result

        return inner_wrapper

    return decorate


def exception_handler():
    def wrapper(func):
        @functools.wraps(func)
        def inner_wrapper(self, *args, **kwargs):
            class_name = None
            function_name = None
            try:
                function_name = func.__name__
                class_name = getattr(self, '__class__', None).__name__ if self else None
                res, result = func(self, *args, **kwargs)
                return res, result
            except Exception as e:
                log_row_length = 300
                e_str = str(e)
                log_e = e_str[0:log_row_length] + '......' if len(e_str) > log_row_length else e_str
                if class_name:
                    log_message = f"Error in {class_name}.{function_name}: {log_e}"
                else:
                    log_message = f"Error in {function_name}: {log_e}"
                log.exception(log_message)
                log.error(log_e)
                return Error(e), False

        return inner_wrapper

    return wrapper


class Checker:
    """
    A base class of milvus operation checker to
       a. check whether milvus is servicing
       b. count operations and success rate
    """

    def __init__(self, collection_name=None, partition_name=None, shards_num=2, dim=8, insert_data=True,
                 schema=None, replica_number=1, **kwargs):
        self.recovery_time = 0
        self._succ = 0
        self._fail = 0
        self.fail_records = []
        self._keep_running = True
        self.rsp_times = []
        self.average_time = 0
        self.scale = 1 * 10 ** 6
        self.files = []
        self.word_freq = Counter()
        self.ms = MilvusSys()
        self.bucket_name = cf.param_info.param_bucket_name

        # Initialize MilvusClient - prioritize uri and token
        if cf.param_info.param_uri:
            uri = cf.param_info.param_uri
        else:
            uri = "http://" + cf.param_info.param_host + ":" + str(cf.param_info.param_port)

        if cf.param_info.param_token:
            token = cf.param_info.param_token
        else:
            token = f"{cf.param_info.param_user}:{cf.param_info.param_password}"
        self.milvus_client = MilvusClient(uri=uri, token=token)

        # Also create a connection for low-level APIs that MilvusClient doesn't support
        self.alias = cf.gen_unique_str("checker_alias_")
        connections.connect(
            alias=self.alias,
            uri=uri,
            token=token
        )
        c_name = collection_name if collection_name is not None else cf.gen_unique_str(
            'Checker_')
        self.c_name = c_name
        p_name = partition_name if partition_name is not None else "_default"
        self.p_name = p_name
        self.p_names = [self.p_name] if partition_name is not None else None

        # Get or create schema
        if self.milvus_client.has_collection(c_name):
            collection_info = self.milvus_client.describe_collection(c_name)
            schema = CollectionSchema.construct_from_dict(collection_info)
        else:
            enable_struct_array_field = kwargs.get("enable_struct_array_field", True)
            enable_dynamic_field = kwargs.get("enable_dynamic_field", True)
            schema = cf.gen_all_datatype_collection_schema(dim=dim, enable_struct_array_field=enable_struct_array_field, enable_dynamic_field=enable_dynamic_field) if schema is None else schema

        log.info(f"schema: {schema}")
        self.schema = schema
        self.dim = cf.get_dim_by_schema(schema=schema)
        self.int64_field_name = cf.get_int64_field_name(schema=schema)
        self.text_field_name = cf.get_text_field_name(schema=schema)
        self.text_match_field_name_list = cf.get_text_match_field_name(schema=schema)
        self.float_vector_field_name = cf.get_float_vec_field_name(schema=schema)

        # Create collection if not exists
        if not self.milvus_client.has_collection(c_name):
            self.milvus_client.create_collection(
                collection_name=c_name,
                schema=schema,
                shards_num=shards_num,
                timeout=timeout
            )
        self.scalar_field_names = cf.get_scalar_field_name_list(schema=schema)
        self.json_field_names = cf.get_json_field_name_list(schema=schema)
        self.geometry_field_names = cf.get_geometry_field_name_list(schema=schema)
        self.float_vector_field_names = cf.get_float_vec_field_name_list(schema=schema)
        self.binary_vector_field_names = cf.get_binary_vec_field_name_list(schema=schema)
        self.int8_vector_field_names = cf.get_int8_vec_field_name_list(schema=schema)
        self.bm25_sparse_field_names = cf.get_bm25_vec_field_name_list(schema=schema)
        self.emb_list_field_names = cf.get_emb_list_field_name_list(schema=schema)

        # Get existing indexes and their fields
        indexed_fields = set()
        try:
            index_names = self.milvus_client.list_indexes(c_name)
            for idx_name in index_names:
                try:
                    idx_info = self.milvus_client.describe_index(c_name, idx_name)
                    if 'field_name' in idx_info:
                        indexed_fields.add(idx_info['field_name'])
                except Exception as e:
                    log.debug(f"Failed to describe index {idx_name}: {e}")
        except Exception as e:
            log.debug(f"Failed to list indexes: {e}")

        log.debug(f"Already indexed fields: {indexed_fields}")

        # create index for scalar fields
        for f in self.scalar_field_names:
            if f in indexed_fields:
                continue
            try:
                index_params = IndexParams()
                index_params.add_index(field_name=f, index_type="INVERTED")
                self.milvus_client.create_index(
                    collection_name=c_name,
                    index_params=index_params,
                    timeout=timeout
                )
            except Exception as e:
                log.debug(f"Failed to create index for {f}: {e}")

        # create index for json fields
        for f in self.json_field_names:
            if f in indexed_fields:
                continue
            for json_path, json_cast in [("name", "varchar"), ("address", "varchar"), ("count", "double")]:
                try:
                    index_params = IndexParams()
                    index_params.add_index(
                        field_name=f,
                        index_type="INVERTED",
                        params={"json_path": f"{f}['{json_path}']", "json_cast_type": json_cast}
                    )
                    self.milvus_client.create_index(
                        collection_name=c_name,
                        index_params=index_params,
                        timeout=timeout
                    )
                except Exception as e:
                    log.debug(f"Failed to create json index for {f}['{json_path}']: {e}")

        # create index for geometry fields
        for f in self.geometry_field_names:
            if f in indexed_fields:
                continue
            try:
                index_params = IndexParams()
                index_params.add_index(field_name=f, index_type="RTREE")
                self.milvus_client.create_index(
                    collection_name=c_name,
                    index_params=index_params,
                    timeout=timeout
                )
            except Exception as e:
                log.debug(f"Failed to create index for {f}: {e}")

        # create index for float vector fields
        vector_index_created = False
        for f in self.float_vector_field_names:
            if f in indexed_fields:
                vector_index_created = True
                log.debug(f"Float vector field {f} already has index")
                continue
            try:
                index_params = create_index_params_from_dict(f, constants.DEFAULT_INDEX_PARAM)
                self.milvus_client.create_index(
                    collection_name=c_name,
                    index_params=index_params,
                    timeout=timeout
                )
                log.debug(f"Created index for float vector field {f}")
                indexed_fields.add(f)
                vector_index_created = True
            except Exception as e:
                log.warning(f"Failed to create index for {f}: {e}")

        # create index for int8 vector fields
        for f in self.int8_vector_field_names:
            if f in indexed_fields:
                vector_index_created = True
                log.debug(f"Int8 vector field {f} already has index")
                continue
            try:
                index_params = create_index_params_from_dict(f, constants.DEFAULT_INT8_INDEX_PARAM)
                self.milvus_client.create_index(
                    collection_name=c_name,
                    index_params=index_params,
                    timeout=timeout
                )
                log.debug(f"Created index for int8 vector field {f}")
                indexed_fields.add(f)
                vector_index_created = True
            except Exception as e:
                log.warning(f"Failed to create index for {f}: {e}")

        # create index for binary vector fields
        for f in self.binary_vector_field_names:
            if f in indexed_fields:
                vector_index_created = True
                log.debug(f"Binary vector field {f} already has index")
                continue
            try:
                index_params = create_index_params_from_dict(f, constants.DEFAULT_BINARY_INDEX_PARAM)
                self.milvus_client.create_index(
                    collection_name=c_name,
                    index_params=index_params,
                    timeout=timeout
                )
                log.debug(f"Created index for binary vector field {f}")
                indexed_fields.add(f)
                vector_index_created = True
            except Exception as e:
                log.warning(f"Failed to create index for {f}: {e}")

        # create index for bm25 sparse fields
        for f in self.bm25_sparse_field_names:
            if f in indexed_fields:
                continue
            try:
                index_params = create_index_params_from_dict(f, constants.DEFAULT_BM25_INDEX_PARAM)
                self.milvus_client.create_index(
                    collection_name=c_name,
                    index_params=index_params,
                    timeout=timeout
                )
                log.debug(f"Created index for bm25 sparse field {f}")
            except Exception as e:
                log.warning(f"Failed to create index for {f}: {e}")

        # create index for emb list fields
        for f in self.emb_list_field_names:
            if f in indexed_fields:
                continue
            try:
                index_params = create_index_params_from_dict(f, constants.DEFAULT_EMB_LIST_INDEX_PARAM)
                self.milvus_client.create_index(
                    collection_name=c_name,
                    index_params=index_params,
                    timeout=timeout
                )
                log.debug(f"Created index for emb list field {f}")
            except Exception as e:
                log.warning(f"Failed to create index for {f}: {e}")

        # Load collection - only if at least one vector field has an index
        self.replica_number = replica_number
        if vector_index_created:
            try:
                self.milvus_client.load_collection(collection_name=c_name, replica_number=self.replica_number)
                log.debug(f"Loaded collection {c_name} with replica_number={self.replica_number}")
            except Exception as e:
                log.warning(f"Failed to load collection {c_name}: {e}. Collection may need to be loaded manually.")
        else:
            log.warning(f"No vector index created for collection {c_name}, skipping load. You may need to create indexes and load manually.")

        # Create partition if specified
        if p_name != "_default" and not self.milvus_client.has_partition(c_name, p_name):
            self.milvus_client.create_partition(collection_name=c_name, partition_name=p_name)

        # Insert initial data if needed
        num_entities = self.milvus_client.get_collection_stats(c_name).get("row_count", 0)
        if insert_data and num_entities == 0:
            log.info(f"collection {c_name} created, start to insert data")
            t0 = time.perf_counter()
            self.insert_data(nb=constants.ENTITIES_FOR_SEARCH, partition_name=self.p_name)
            log.info(f"insert data for collection {c_name} cost {time.perf_counter() - t0}s")

        self.initial_entities = self.milvus_client.get_collection_stats(c_name).get("row_count", 0)
        self.scale = 100000  # timestamp scale to make time.time() as int64

    def get_schema(self):
        collection_info = self.milvus_client.describe_collection(self.c_name)
        return collection_info

    def insert_data(self, nb=constants.DELTA_PER_INS, partition_name=None):
        partition_name = self.p_name if partition_name is None else partition_name
        client_schema = self.milvus_client.describe_collection(collection_name=self.c_name)
        data = cf.gen_row_data_by_schema(nb=nb, schema=client_schema)
        ts_data = []
        for i in range(nb):
            time.sleep(0.001)
            offset_ts = int(time.time() * self.scale)
            ts_data.append(offset_ts)
        for i in range(nb):
            data[i][self.int64_field_name] = ts_data[i]
        df = pd.DataFrame(data)
        for text_field in self.text_match_field_name_list:
            if text_field in df.columns:
                texts = df[text_field].tolist()
                wf = cf.analyze_documents(texts)
                self.word_freq.update(wf)

        # Debug: Check if struct array fields are present in generated data before insert
        if data and len(data) > 0:
            log.debug(f"[insert_data] First row keys: {list(data[0].keys())}")
            # Check for struct array fields (common names: struct_array, metadata, etc.)
            for key, value in data[0].items():
                if isinstance(value, list) and value and isinstance(value[0], dict):
                    log.debug(f"[insert_data] Found potential struct array field '{key}': {len(value)} items, first item: {value[0]}")

        try:
            res = self.milvus_client.insert(
                                             collection_name=self.c_name,
                                             data=data,
                                             partition_name=partition_name,
                                             timeout=timeout,
                                             enable_traceback=enable_traceback,
                                             check_task=CheckTasks.check_nothing)
            return res, True
        except Exception as e:
            return str(e), False

    def total(self):
        return self._succ + self._fail

    def succ_rate(self):
        return self._succ / self.total() if self.total() != 0 else 0

    def check_result(self):
        succ_rate = self.succ_rate()
        total = self.total()
        rsp_times = self.rsp_times
        average_time = 0 if len(rsp_times) == 0 else sum(
            rsp_times) / len(rsp_times)
        max_time = 0 if len(rsp_times) == 0 else max(rsp_times)
        min_time = 0 if len(rsp_times) == 0 else min(rsp_times)
        checker_name = self.__class__.__name__
        checkers_result = f"{checker_name}, succ_rate: {succ_rate:.2f}, total: {total:03d}, average_time: {average_time:.4f}, max_time: {max_time:.4f}, min_time: {min_time:.4f}"
        log.info(checkers_result)
        log.debug(f"{checker_name} rsp times: {self.rsp_times}")
        if len(self.fail_records) > 0:
            log.info(f"{checker_name} failed at {self.fail_records}")
        return checkers_result

    def terminate(self):
        self._keep_running = False
        self.reset()

    def pause(self):
        self._keep_running = False
        time.sleep(10)

    def resume(self):
        self._keep_running = True
        time.sleep(10)

    def reset(self):
        self._succ = 0
        self._fail = 0
        self.rsp_times = []
        self.fail_records = []
        self.average_time = 0

    def get_rto(self):
        if len(self.fail_records) == 0:
            return 0
        end = self.fail_records[-1][3]
        start = self.fail_records[0][3]
        recovery_time = end - start  # second
        self.recovery_time = recovery_time
        checker_name = self.__class__.__name__
        log.info(f"{checker_name} recovery time is {self.recovery_time}, start at {self.fail_records[0][2]}, "
                 f"end at {self.fail_records[-1][2]}")
        return recovery_time

    def prepare_bulk_insert_data(self,
                                 nb=constants.ENTITIES_FOR_BULKINSERT,
                                 file_type="npy",
                                 minio_endpoint="127.0.0.1:9000",
                                 bucket_name=None):
        schema = self.schema
        bucket_name = self.bucket_name if bucket_name is None else bucket_name
        log.info("prepare data for bulk insert")
        try:
            files = cf.prepare_bulk_insert_data(schema=schema,
                                                nb=nb,
                                                file_type=file_type,
                                                minio_endpoint=minio_endpoint,
                                                bucket_name=bucket_name)
            self.files = files
            return files, True
        except Exception as e:
            log.error(f"prepare data for bulk insert failed with error {e}")
            return [], False

    def do_bulk_insert(self):
        log.info(f"bulk insert collection name: {self.c_name}")
        from pymilvus import utility
        task_ids = utility.do_bulk_insert(collection_name=self.c_name, files=self.files, using=self.alias)
        log.info(f"task ids {task_ids}")
        completed = utility.wait_for_bulk_insert_tasks_completed(task_ids=[task_ids], timeout=720, using=self.alias)
        return task_ids, completed


class CollectionLoadChecker(Checker):
    """check collection load operations in a dependent thread"""

    def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ):
        self.replica_number = replica_number
        if collection_name is None:
            collection_name = cf.gen_unique_str("CollectionLoadChecker_")
        super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)

    @trace()
    def load_collection(self):
        try:
            self.milvus_client.load_collection(collection_name=self.c_name, replica_number=self.replica_number)
            return None, True
        except Exception as e:
            return str(e), False

    @exception_handler()
    def run_task(self):
        res, result = self.load_collection()
        if result:
            self.milvus_client.release_collection(collection_name=self.c_name)
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP)


class CollectionReleaseChecker(Checker):
    """check collection release operations in a dependent thread"""

    def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ):
        self.replica_number = replica_number
        if collection_name is None:
            collection_name = cf.gen_unique_str("CollectionReleaseChecker_")
        super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
        self.milvus_client.load_collection(collection_name=self.c_name, replica_number=self.replica_number)

    @trace()
    def release_collection(self):
        try:
            self.milvus_client.release_collection(collection_name=self.c_name)
            return None, True
        except Exception as e:
            return str(e), False

    @exception_handler()
    def run_task(self):
        res, result = self.release_collection()
        if result:
            self.milvus_client.load_collection(collection_name=self.c_name, replica_number=self.replica_number)
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP)



class CollectionRenameChecker(Checker):
    """check collection rename operations in a dependent thread"""

    def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ):
        self.replica_number = replica_number
        if collection_name is None:
            collection_name = cf.gen_unique_str("CollectionRenameChecker_")
        super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)

    @trace()
    def rename_collection(self, old_collection_name, new_collection_name):
        try:
            self.milvus_client.rename_collection(old_name=old_collection_name, new_name=new_collection_name)
            return None, True
        except Exception as e:
            log.info(f"rename collection failed with error {e}")
            return str(e), False

    @exception_handler()
    def run_task(self):
        new_collection_name = "CollectionRenameChecker_" + cf.gen_unique_str("new_")
        res, result = self.rename_collection(self.c_name, new_collection_name)
        if result:
            result = self.milvus_client.has_collection(collection_name=new_collection_name)
            if result:
                self.c_name = new_collection_name
                data = cf.gen_row_data_by_schema(nb=1, schema=self.schema)
                self.milvus_client.insert(collection_name=new_collection_name, data=data)
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP)


class PartitionLoadChecker(Checker):
    """check partition load operations in a dependent thread"""

    def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ):
        self.replica_number = replica_number
        if collection_name is None:
            collection_name = cf.gen_unique_str("PartitionLoadChecker_")
        p_name = cf.gen_unique_str("PartitionLoadChecker_")
        super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema, partition_name=p_name)
        self.milvus_client.release_collection(collection_name=self.c_name)

    @trace()
    def load_partition(self):
        try:
            self.milvus_client.load_partitions(collection_name=self.c_name, partition_names=[self.p_name], replica_number=self.replica_number)
            return None, True
        except Exception as e:
            return str(e), False

    @exception_handler()
    def run_task(self):
        res, result = self.load_partition()
        if result:
            self.milvus_client.release_partitions(collection_name=self.c_name, partition_names=[self.p_name])
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP)


class PartitionReleaseChecker(Checker):
    """check partition release operations in a dependent thread"""

    def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ):
        self.replica_number = replica_number
        if collection_name is None:
            collection_name = cf.gen_unique_str("PartitionReleaseChecker_")
        p_name = cf.gen_unique_str("PartitionReleaseChecker_")
        super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema, partition_name=p_name)
        self.milvus_client.release_collection(collection_name=self.c_name)
        self.milvus_client.load_partitions(collection_name=self.c_name, partition_names=[self.p_name], replica_number=self.replica_number)

    @trace()
    def release_partition(self):
        try:
            self.milvus_client.release_partitions(collection_name=self.c_name, partition_names=[self.p_name])
            return None, True
        except Exception as e:
            return str(e), False

    @exception_handler()
    def run_task(self):
        res, result = self.release_partition()
        if result:
            self.milvus_client.load_partitions(collection_name=self.c_name, partition_names=[self.p_name], replica_number=self.replica_number)
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP)


class SearchChecker(Checker):
    """check search operations in a dependent thread"""

    def __init__(self, collection_name=None, shards_num=2, schema=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("SearchChecker_")
        super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
        self.insert_data()
        self.dense_anns_field_name_list = cf.get_dense_anns_field_name_list(self.schema)
        self.data = None
        self.anns_field_name = None
        self.search_param = None

    @trace()
    def search(self):
        try:
            res = self.milvus_client.search(
                collection_name=self.c_name,
                data=self.data,
                anns_field=self.anns_field_name,
                search_params=self.search_param,
                limit=5,
                partition_names=self.p_names,
                timeout=search_timeout
            )
            return res, True
        except Exception as e:
            return str(e), False

    @exception_handler()
    def run_task(self):
        anns_field_item = random.choice(self.dense_anns_field_name_list)
        self.anns_field_name = anns_field_item["name"]
        dim = anns_field_item["dim"]
        self.data = cf.gen_vectors(5, dim, vector_data_type=anns_field_item["dtype"])
        if anns_field_item["dtype"] in [DataType.FLOAT_VECTOR, DataType.FLOAT16_VECTOR, DataType.BFLOAT16_VECTOR]:
            self.search_param = constants.DEFAULT_SEARCH_PARAM
        elif anns_field_item["dtype"] == DataType.INT8_VECTOR:
            self.search_param = constants.DEFAULT_INT8_SEARCH_PARAM

        res, result = self.search()
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP / 10)

class TensorSearchChecker(Checker):
    """check search operations for struct array vector fields in a dependent thread"""

    def __init__(self, collection_name=None, shards_num=2, schema=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("TensorSearchChecker_")
        super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
        self.insert_data()
        # Only get struct array vector fields
        self.struct_array_vector_field_list = cf.get_struct_array_vector_field_list(self.schema)
        self.data = None
        self.anns_field_name = None
        self.search_param = None

    @staticmethod
    def _create_embedding_list(dim, num_vectors, dtype):
        """Create EmbeddingList for struct array vector search"""
        embedding_list = EmbeddingList()
        vectors = cf.gen_vectors(num_vectors, dim, vector_data_type=dtype)
        for vector in vectors:
            embedding_list.add(vector)
        return embedding_list

    @trace()
    def search(self):
        try:
            res = self.milvus_client.search(
                collection_name=self.c_name,
                data=self.data,
                anns_field=self.anns_field_name,
                search_params=self.search_param,
                limit=5,
                partition_names=self.p_names,
                timeout=search_timeout
            )
            return res, True
        except Exception as e:
            return str(e), False

    @exception_handler()
    def run_task(self):
        if not self.struct_array_vector_field_list:
            log.warning("No struct array vector fields available for search")
            return None, False

        # Randomly select a struct array vector field
        anns_field_item = random.choice(self.struct_array_vector_field_list)
        dim = anns_field_item["dim"]
        dtype = anns_field_item["dtype"]

        # Use the anns_field format: struct_field[vector_field]
        self.anns_field_name = anns_field_item["anns_field"]

        # Create EmbeddingList with random number of vectors (1-5)
        num_vectors = random.randint(1, 5)
        self.data = [self._create_embedding_list(dim, num_vectors, dtype)]

        # Use MAX_SIM_COSINE for struct array vector search
        self.search_param = {"metric_type": "MAX_SIM_COSINE"}

        res, result = self.search()
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP / 10)


class FullTextSearchChecker(Checker):
    """check full text search operations in a dependent thread"""

    def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ):
        if collection_name is None:
            collection_name = cf.gen_unique_str("FullTextSearchChecker_")
        super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
        self.insert_data()

    @trace()
    def full_text_search(self):
        bm25_anns_field = random.choice(self.bm25_sparse_field_names)
        try:
            res = self.milvus_client.search(
                collection_name=self.c_name,
                data=cf.gen_vectors(5, self.dim, vector_data_type="TEXT_SPARSE_VECTOR"),
                anns_field=bm25_anns_field,
                search_params=constants.DEFAULT_BM25_SEARCH_PARAM,
                limit=5,
                partition_names=self.p_names,
                timeout=search_timeout
            )
            return res, True
        except Exception as e:
            return str(e), False

    @exception_handler()
    def run_task(self):
        res, result = self.full_text_search()
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP / 10)


class HybridSearchChecker(Checker):
    """check hybrid search operations in a dependent thread"""

    def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ):
        if collection_name is None:
            collection_name = cf.gen_unique_str("HybridSearchChecker_")
        super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
        # do load before search
        self.milvus_client.load_collection(collection_name=self.c_name, replica_number=replica_number)
        self.insert_data()

    def gen_hybrid_search_request(self):
        res = []
        dim = self.dim
        for vec_field_name in self.float_vector_field_names:
            search_param = {
                "data": cf.gen_vectors(1, dim),
                "anns_field": vec_field_name,
                "param": constants.DEFAULT_SEARCH_PARAM,
                "limit": 10,
                "expr": f"{self.int64_field_name} > 0",
            }
            req = AnnSearchRequest(**search_param)
            res.append(req)
        return res

    @trace()
    def hybrid_search(self):
        try:
            res = self.milvus_client.hybrid_search(
                collection_name=self.c_name,
                reqs=self.gen_hybrid_search_request(),
                ranker=RRFRanker(),
                limit=10,
                partition_names=self.p_names,
                timeout=search_timeout
            )
            return res, True
        except Exception as e:
            log.info(f"hybrid search failed with error {e}")
            return str(e), False

    @exception_handler()
    def run_task(self):
        res, result = self.hybrid_search()
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP / 10)


class InsertFlushChecker(Checker):
    """check Insert and flush operations in a dependent thread"""

    def __init__(self, collection_name=None, flush=False, shards_num=2, schema=None):
        super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
        self._flush = flush
        stats = self.milvus_client.get_collection_stats(collection_name=self.c_name)
        self.initial_entities = stats.get("row_count", 0)

    def keep_running(self):
        while True:
            t0 = time.time()
            try:
                self.milvus_client.insert(
                    collection_name=self.c_name,
                    data=cf.gen_row_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema),
                    timeout=timeout
                )
                insert_result = True
            except Exception:
                insert_result = False
            t1 = time.time()
            if not self._flush:
                if insert_result:
                    self.rsp_times.append(t1 - t0)
                    self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1)
                    self._succ += 1
                    log.debug(f"insert success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}")
                else:
                    self._fail += 1
                sleep(constants.WAIT_PER_OP / 10)
            else:
                # call flush to get num_entities
                t0 = time.time()
                self.milvus_client.flush(collection_names=[self.c_name])
                stats = self.milvus_client.get_collection_stats(collection_name=self.c_name)
                num_entities = stats.get("row_count", 0)
                t1 = time.time()
                if num_entities == (self.initial_entities + constants.DELTA_PER_INS):
                    self.rsp_times.append(t1 - t0)
                    self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1)
                    self._succ += 1
                    log.debug(f"flush success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}")
                    self.initial_entities += constants.DELTA_PER_INS
                else:
                    self._fail += 1
                sleep(constants.WAIT_PER_OP * 6)


class FlushChecker(Checker):
    """check flush operations in a dependent thread"""

    def __init__(self, collection_name=None, shards_num=2, schema=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("FlushChecker_")
        super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
        stats = self.milvus_client.get_collection_stats(collection_name=self.c_name)
        self.initial_entities = stats.get("row_count", 0)

    @trace()
    def flush(self):
        try:
            self.milvus_client.flush(collection_name=self.c_name)
            return None, True
        except Exception as e:
            log.info(f"flush error: {e}")
            return str(e), False

    @exception_handler()
    def run_task(self):
        try:
            self.milvus_client.insert(
                collection_name=self.c_name,
                data=cf.gen_row_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema),
                timeout=timeout
            )
            result = True
        except Exception:
            result = False
        res, result = self.flush()
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP * 6)


class AddFieldChecker(Checker):
    """check add field operations in a dependent thread"""

    def __init__(self, collection_name=None, shards_num=2, schema=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("AddFieldChecker_")
        super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
        stats = self.milvus_client.get_collection_stats(collection_name=self.c_name)
        self.initial_entities = stats.get("row_count", 0)

    @trace()
    def add_field(self):
        try:
            new_field_name = cf.gen_unique_str("new_field_")
            self.milvus_client.add_collection_field(collection_name=self.c_name,
                                                    field_name=new_field_name,
                                                    data_type=DataType.INT64,
                                                    nullable=True)
            log.debug(f"add field {new_field_name} to collection {self.c_name}")
            time.sleep(1)
            _, result = self.insert_data()
            res = self.milvus_client.query(collection_name=self.c_name,
                                           filter=f"{new_field_name} >= 0",
                                           output_fields=[new_field_name])
            result = True
            if result:
                log.debug(f"query with field {new_field_name} success")
            return None, result
        except Exception as e:
            log.error(e)
            return str(e), False

    @exception_handler()
    def run_task(self):
        res, result = self.add_field()
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP * 6)



class InsertChecker(Checker):
    """check insert operations in a dependent thread"""

    def __init__(self, collection_name=None, flush=False, shards_num=2, schema=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("InsertChecker_")
        super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
        self._flush = flush
        stats = self.milvus_client.get_collection_stats(collection_name=self.c_name)
        self.initial_entities = stats.get("row_count", 0)
        self.inserted_data = []
        self.scale = 1 * 10 ** 6
        self.start_time_stamp = int(time.time() * self.scale)  # us
        self.term_expr = f'{self.int64_field_name} >= {self.start_time_stamp}'
        self.file_name = f"/tmp/ci_logs/insert_data_{uuid.uuid4()}.parquet"

    @trace()
    def insert_entities(self):
        # Use describe_collection directly to preserve struct_fields information
        schema = self.milvus_client.describe_collection(self.c_name)
        data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=schema)
        rows = len(data)
        ts_data = []
        for i in range(constants.DELTA_PER_INS):
            time.sleep(0.001)
            offset_ts = int(time.time() * self.scale)
            ts_data.append(offset_ts)

        for i in range(rows):
            data[i][self.int64_field_name] = ts_data[i]

        log.debug(f"insert data: {rows}")
        # Debug: Check if struct array fields are present in generated data
        if data and len(data) > 0:
            log.debug(f"[InsertChecker] First row keys: {list(data[0].keys())}")
            # Check for struct array fields (common names: struct_array, metadata, etc.)
            for key, value in data[0].items():
                if isinstance(value, list) and value and isinstance(value[0], dict):
                    log.debug(f"[InsertChecker] Found potential struct array field '{key}': {len(value)} items, first item: {value[0]}")

        try:
            res = self.milvus_client.insert(collection_name=self.c_name,
                                           data=data,
                                           partition_name=self.p_names[0] if self.p_names else None,
                                           timeout=timeout)
            return res, True
        except Exception as e:
            log.info(f"insert error: {e}")
            return str(e), False

    @exception_handler()
    def run_task(self):

        res, result = self.insert_entities()
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP / 10)

    def verify_data_completeness(self):
        # deprecated
        try:
            index_params = create_index_params_from_dict(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM)
            self.milvus_client.create_index(collection_name=self.c_name, index_params=index_params)
        except Exception as e:
            log.error(f"create index error: {e}")
        self.milvus_client.load_collection(collection_name=self.c_name)
        end_time_stamp = int(time.time() * self.scale)
        self.term_expr = f'{self.int64_field_name} >= {self.start_time_stamp} and ' \
                         f'{self.int64_field_name} <= {end_time_stamp}'
        data_in_client = []
        for d in self.inserted_data:
            if self.start_time_stamp <= d <= end_time_stamp:
                data_in_client.append(d)
        res = self.milvus_client.query(collection_name=self.c_name, filter=self.term_expr,
                                      output_fields=[f'{self.int64_field_name}'],
                                      limit=len(data_in_client) * 2, timeout=timeout)
        result = True

        data_in_server = []
        for r in res:
            d = r[f"{ct.default_int64_field_name}"]
            data_in_server.append(d)
        pytest.assume(set(data_in_server) == set(data_in_client))


class InsertFreshnessChecker(Checker):
    """check insert freshness operations in a dependent thread"""

    def __init__(self, collection_name=None, flush=False, shards_num=2, schema=None):
        self.latest_data = None
        if collection_name is None:
            collection_name = cf.gen_unique_str("InsertChecker_")
        super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
        self._flush = flush
        stats = self.milvus_client.get_collection_stats(collection_name=self.c_name)
        self.initial_entities = stats.get("row_count", 0)
        self.inserted_data = []
        self.scale = 1 * 10 ** 6
        self.start_time_stamp = int(time.time() * self.scale)  # us
        self.term_expr = f'{self.int64_field_name} >= {self.start_time_stamp}'
        self.file_name = f"/tmp/ci_logs/insert_data_{uuid.uuid4()}.parquet"

    def insert_entities(self):
        schema = self.get_schema()
        data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=schema)
        ts_data = []
        for i in range(constants.DELTA_PER_INS):
            time.sleep(0.001)
            offset_ts = int(time.time() * self.scale)
            ts_data.append(offset_ts)

        data[0] = ts_data  # set timestamp (ms) as int64
        log.debug(f"insert data: {len(ts_data)}")
        try:
            res = self.milvus_client.insert(collection_name=self.c_name,
                                           data=data,
                                           partition_name=self.p_names[0] if self.p_names else None,
                                           timeout=timeout)
            result = True
        except Exception as e:
            res = str(e)
            result = False
        self.latest_data = ts_data[-1]
        self.term_expr = f'{self.int64_field_name} == {self.latest_data}'
        return res, result

    @trace()
    def insert_freshness(self):
        while True:
            try:
                res = self.milvus_client.query(collection_name=self.c_name,
                                              filter=self.term_expr,
                                              output_fields=[f'{self.int64_field_name}'],
                                              timeout=timeout)
                result = True
            except Exception as e:
                res = str(e)
                result = False
                break
            if len(res) == 1 and res[0][f"{self.int64_field_name}"] == self.latest_data:
                break
        return res, result

    @exception_handler()
    def run_task(self):
        res, result = self.insert_entities()
        res, result = self.insert_freshness()
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP / 10)


class UpsertChecker(Checker):
    """check upsert operations in a dependent thread"""

    def __init__(self, collection_name=None, shards_num=2, schema=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("UpsertChecker_")
        super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
        schema = self.milvus_client.describe_collection(collection_name=self.c_name)
        self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=schema)

    @trace()
    def upsert_entities(self):
        try:
            res = self.milvus_client.upsert(collection_name=self.c_name,
                                           data=self.data,
                                           timeout=timeout)
            return res, True
        except Exception as e:
            log.info(f"upsert failed: {e}")
            return str(e), False

    @exception_handler()
    def run_task(self):
        # half of the data is upsert, the other half is insert
        rows = len(self.data)
        pk_old = [d[self.int64_field_name] for d in self.data[:rows // 2]]
        schema = self.milvus_client.describe_collection(collection_name=self.c_name)
        self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=schema)
        pk_new = [d[self.int64_field_name] for d in self.data[rows // 2:]]
        pk_update = pk_old + pk_new
        for i in range(rows):
            self.data[i][self.int64_field_name] = pk_update[i]
        res, result = self.upsert_entities()
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP * 6)


class UpsertFreshnessChecker(Checker):
    """check upsert freshness operations in a dependent thread"""

    def __init__(self, collection_name=None, shards_num=2, schema=None):
        self.term_expr = None
        self.latest_data = None
        if collection_name is None:
            collection_name = cf.gen_unique_str("UpsertChecker_")
        super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
        schema = self.get_schema()
        self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=schema)

    def upsert_entities(self):
        try:
            res = self.milvus_client.upsert(collection_name=self.c_name,
                                           data=self.data,
                                           timeout=timeout)
            return res, True
        except Exception as e:
            return str(e), False

    @trace()
    def upsert_freshness(self):
        while True:
            try:
                res = self.milvus_client.query(collection_name=self.c_name,
                                              filter=self.term_expr,
                                              output_fields=[f'{self.int64_field_name}'],
                                              timeout=timeout)
                result = True
            except Exception as e:
                res = str(e)
                result = False
                break
            if len(res) == 1 and res[0][f"{self.int64_field_name}"] == self.latest_data:
                break
        return res, result

    @exception_handler()
    def run_task(self):
        # half of the data is upsert, the other half is insert
        rows = len(self.data[0])
        pk_old = self.data[0][:rows // 2]
        schema = self.get_schema()
        self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=schema)
        pk_new = self.data[0][rows // 2:]
        pk_update = pk_old + pk_new
        self.data[0] = pk_update
        self.latest_data = self.data[0][-1]
        self.term_expr = f'{self.int64_field_name} == {self.latest_data}'
        res, result = self.upsert_entities()
        res, result = self.upsert_freshness()
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP * 6)
    
class PartialUpdateChecker(Checker):
    """check partial update operations in a dependent thread"""

    def __init__(self, collection_name=None, shards_num=2, schema=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("PartialUpdateChecker_")
        super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema, enable_struct_array_field=False)
        schema = self.get_schema()
        self.data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=schema)

    @trace()
    def partial_update_entities(self):
        try:

            res = self.milvus_client.upsert(collection_name=self.c_name,
                                           data=self.data,
                                           partial_update=True,
                                           timeout=timeout)
            return res, True
        except Exception as e:
            log.info(f"error {e}")
            return str(e), False

    @exception_handler()
    def run_task(self, count=0):

        schema = self.get_schema()
        pk_field_name = self.int64_field_name
        rows = len(self.data)

        # if count is even, use partial update; if count is odd, use full insert
        if count % 2 == 0:
            # Generate a fresh full batch (used for inserts and as a source of values)
            full_rows = cf.gen_row_data_by_schema(nb=rows, schema=schema)
            self.data = full_rows
        else:
            num_fields = len(schema["fields"])
            # Choose subset fields to update: always include PK + one non-PK field if available
            num = count % num_fields
            desired_fields = [pk_field_name, schema["fields"][num if num != 0 else 1]["name"]]
            partial_rows = cf.gen_row_data_by_schema(nb=rows, schema=schema,
                                                    desired_field_names=desired_fields)
            self.data = partial_rows
        res, result = self.partial_update_entities()
        return res, result

    def keep_running(self):
        count = 0
        while self._keep_running:
            self.run_task(count)
            count += 1
            sleep(constants.WAIT_PER_OP * 6)


class CollectionCreateChecker(Checker):
    """check collection create operations in a dependent thread"""

    def __init__(self, collection_name=None, schema=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("CreateChecker_")
        super().__init__(collection_name=collection_name, schema=schema)

    @trace()
    def init_collection(self):
        try:
            collection_name = cf.gen_unique_str("CreateChecker_")
            schema = cf.gen_default_collection_schema()
            self.milvus_client.create_collection(collection_name=collection_name,
                                                schema=schema)
            return None, True
        except Exception as e:
            return str(e), False

    @exception_handler()
    def run_task(self):
        res, result = self.init_collection()
        # if result:
        #     # 50% chance to drop collection
        #     if random.randint(0, 1) == 0:
        #         self.c_wrap.drop(timeout=timeout)
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP)


class CollectionDropChecker(Checker):
    """check collection drop operations in a dependent thread"""

    def __init__(self, collection_name=None, schema=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("DropChecker_")
        super().__init__(collection_name=collection_name, schema=schema)
        self.collection_pool = []
        self.gen_collection_pool(schema=self.schema)

    def gen_collection_pool(self, pool_size=50, schema=None):
        for i in range(pool_size):
            collection_name = cf.gen_unique_str("DropChecker_")
            try:
                self.milvus_client.create_collection(collection_name=collection_name, schema=schema)
                self.collection_pool.append(collection_name)
            except Exception as e:
                log.error(f"Failed to create collection {collection_name}: {e}")

    @trace()
    def drop_collection(self):
        try:
            self.milvus_client.drop_collection(collection_name=self.c_name)
            if self.c_name in self.collection_pool:
                self.collection_pool.remove(self.c_name)
            return None, True
        except Exception as e:
            log.info(f"error while dropping collection {self.c_name}: {e}")
            return str(e), False

    @exception_handler()
    def run_task(self):
        res, result = self.drop_collection()
        return res, result

    def keep_running(self):
        while self._keep_running:
            res, result = self.run_task()
            if result:
                try:
                    if len(self.collection_pool) <= 10:
                        self.gen_collection_pool(schema=self.schema)
                except Exception as e:
                    log.error(f"Failed to generate collection pool: {e}")
                try:
                    c_name = self.collection_pool[0]
                    # Update current collection name to use from pool
                    self.c_name = c_name
                except Exception as e:
                    log.error(f"Failed to init new collection: {e}")
            sleep(constants.WAIT_PER_OP)


class PartitionCreateChecker(Checker):
    """check partition create operations in a dependent thread"""

    def __init__(self, collection_name=None, schema=None, partition_name=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("PartitionCreateChecker_")
        super().__init__(collection_name=collection_name, schema=schema, partition_name=partition_name)
        c_name = cf.gen_unique_str("PartitionDropChecker_")
        self.milvus_client.create_collection(collection_name=c_name, schema=self.schema)
        self.c_name = c_name
        log.info(f"collection {c_name} created")
        p_name = cf.gen_unique_str("PartitionDropChecker_")
        self.milvus_client.create_partition(collection_name=self.c_name, partition_name=p_name)
        self.p_name = p_name
        log.info(f"partition: {self.p_name}")

    @trace()
    def create_partition(self):
        try:
            partition_name = cf.gen_unique_str("PartitionCreateChecker_")
            self.milvus_client.create_partition(collection_name=self.c_name,
                                               partition_name=partition_name)
            return None, True
        except Exception as e:
            return str(e), False

    @exception_handler()
    def run_task(self):
        res, result = self.create_partition()
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP)


class PartitionDropChecker(Checker):
    """check partition drop operations in a dependent thread"""

    def __init__(self, collection_name=None, schema=None, partition_name=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("PartitionDropChecker_")
        super().__init__(collection_name=collection_name, schema=schema, partition_name=partition_name)
        c_name = cf.gen_unique_str("PartitionDropChecker_")
        self.milvus_client.create_collection(collection_name=c_name, schema=self.schema)
        self.c_name = c_name
        log.info(f"collection {c_name} created")
        p_name = cf.gen_unique_str("PartitionDropChecker_")
        self.milvus_client.create_partition(collection_name=self.c_name, partition_name=p_name)
        self.p_name = p_name
        log.info(f"partition: {self.p_name}")

    @trace()
    def drop_partition(self):
        try:
            self.milvus_client.drop_partition(collection_name=self.c_name, partition_name=self.p_name)
            return None, True
        except Exception as e:
            return str(e), False

    @exception_handler()
    def run_task(self):
        res, result = self.drop_partition()
        if result:
            # create two partition then drop one
            for i in range(2):
                p_name = cf.gen_unique_str("PartitionDropChecker_")
                self.milvus_client.create_partition(collection_name=self.c_name, partition_name=p_name)
                if i == 1:  # Keep track of the last partition to drop next time
                    self.p_name = p_name
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP)


class DatabaseCreateChecker(Checker):
    """check create database operations in a dependent thread"""

    def __init__(self, collection_name=None, schema=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("DatabaseChecker_")
        super().__init__(collection_name=collection_name, schema=schema)
        self.db_name = None

    @trace()
    def init_db(self):
        db_name = cf.gen_unique_str("db_")
        try:
            self.milvus_client.create_database(db_name=db_name)
            self.db_name = db_name
            return None, True
        except Exception as e:
            return str(e), False

    @exception_handler()
    def run_task(self):
        res, result = self.init_db()
        if result:
            self.milvus_client.drop_database(db_name=self.db_name)
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP)


class DatabaseDropChecker(Checker):
    """check drop database operations in a dependent thread"""

    def __init__(self, collection_name=None, schema=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("DatabaseChecker_")
        super().__init__(collection_name=collection_name, schema=schema)
        self.db_name = cf.gen_unique_str("db_")
        self.milvus_client.create_database(db_name=self.db_name)

    @trace()
    def drop_db(self):
        try:
            self.milvus_client.drop_database(db_name=self.db_name)
            return None, True
        except Exception as e:
            return str(e), False

    @exception_handler()
    def run_task(self):
        res, result = self.drop_db()
        if result:
            self.db_name = cf.gen_unique_str("db_")
            self.milvus_client.create_database(db_name=self.db_name)
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP)


class IndexCreateChecker(Checker):
    """check index create operations in a dependent thread"""

    def __init__(self, collection_name=None, schema=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("IndexChecker_")
        super().__init__(collection_name=collection_name, schema=schema)
        for i in range(5):
            self.milvus_client.insert(collection_name=self.c_name,
                                     data=cf.gen_row_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema),
                                     timeout=timeout)
        # do as a flush before indexing
        stats = self.milvus_client.get_collection_stats(collection_name=self.c_name)
        log.debug(f"Index ready entities: {stats.get('row_count', 0)}")

    @trace()
    def create_index(self):
        try:
            index_params = create_index_params_from_dict(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM)
            self.milvus_client.create_index(collection_name=self.c_name,
                                           index_params=index_params)
            return None, True
        except Exception as e:
            return str(e), False

    @exception_handler()
    def run_task(self):
        c_name = cf.gen_unique_str("IndexCreateChecker_")
        self.milvus_client.create_collection(collection_name=c_name, schema=self.schema)
        self.c_name = c_name
        res, result = self.create_index()
        if result:
            self.milvus_client.drop_index(collection_name=self.c_name, index_name="")
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP * 6)


class IndexDropChecker(Checker):
    """check index drop operations in a dependent thread"""

    def __init__(self, collection_name=None, schema=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("IndexChecker_")
        super().__init__(collection_name=collection_name, schema=schema)
        for i in range(5):
            self.milvus_client.insert(collection_name=self.c_name, data=cf.gen_row_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema),
                               timeout=timeout)
        # do as a flush before indexing
        stats = self.milvus_client.get_collection_stats(collection_name=self.c_name)
        log.debug(f"Index ready entities: {stats.get('row_count', 0)}")

    @trace()
    def drop_index(self):
        try:
            res = self.milvus_client.drop_index(collection_name=self.c_name, index_name="")
            return res, True
        except Exception as e:
            log.info(f"drop_index error: {e}")
            return str(e), False

    @exception_handler()
    def run_task(self):
        res, result = self.drop_index()
        if result:
            self.milvus_client.create_collection(collection_name=cf.gen_unique_str("IndexDropChecker_"), schema=self.schema)
            index_params = create_index_params_from_dict(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM)
            self.milvus_client.create_index(collection_name=self.c_name, index_params=index_params)
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.milvus_client.create_collection(collection_name=cf.gen_unique_str("IndexDropChecker_"), schema=self.schema)
            index_params = create_index_params_from_dict(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM)
            self.milvus_client.create_index(collection_name=self.c_name, index_params=index_params)
            self.run_task()
            sleep(constants.WAIT_PER_OP * 6)


class QueryChecker(Checker):
    """check query operations in a dependent thread"""

    def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("QueryChecker_")
        super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
        index_params = create_index_params_from_dict(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM)
        self.milvus_client.create_index(collection_name=self.c_name, index_params=index_params)
        self.milvus_client.load_collection(collection_name=self.c_name, replica_number=replica_number)  # do load before query
        self.insert_data()
        self.term_expr = f"{self.int64_field_name} > 0"

    @trace()
    def query(self):
        try:
            res = self.milvus_client.query(collection_name=self.c_name, filter=self.term_expr, limit=5, timeout=query_timeout)
            return res, True
        except Exception as e:
            log.info(f"query error: {e}")
            return str(e), False

    @exception_handler()
    def run_task(self):
        res, result = self.query()
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP / 10)


class TextMatchChecker(Checker):
    """check text match query operations in a dependent thread"""

    def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("QueryChecker_")
        super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
        index_params = create_index_params_from_dict(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM)
        self.milvus_client.create_index(collection_name=self.c_name, index_params=index_params)
        self.milvus_client.load_collection(collection_name=self.c_name, replica_number=replica_number)  # do load before query
        self.insert_data()
        key_word = self.word_freq.most_common(1)[0][0]
        text_match_field_name = random.choice(self.text_match_field_name_list)
        self.term_expr = f"TEXT_MATCH({text_match_field_name}, '{key_word}')"

    @trace()
    def text_match(self):
        try:
            res = self.milvus_client.query(collection_name=self.c_name, filter=self.term_expr, limit=5, timeout=query_timeout)
            return res, True
        except Exception as e:
            log.info(f"text_match error: {e}")
            return str(e), False

    @exception_handler()
    def run_task(self):
        key_word = self.word_freq.most_common(1)[0][0]
        text_match_field_name = random.choice(self.text_match_field_name_list)
        self.term_expr = f"TEXT_MATCH({text_match_field_name}, '{key_word}')"
        res, result = self.text_match()
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP / 10)


class PhraseMatchChecker(Checker):
    """check phrase match query operations in a dependent thread"""

    def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("PhraseMatchChecker_")
        super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
        index_params = create_index_params_from_dict(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM)
        self.milvus_client.create_index(collection_name=self.c_name, index_params=index_params)
        self.milvus_client.load_collection(collection_name=self.c_name, replica_number=replica_number)  # do load before query
        self.insert_data()
        key_word_1 = self.word_freq.most_common(2)[0][0]
        key_word_2 = self.word_freq.most_common(2)[1][0]
        slop=5
        text_match_field_name = random.choice(self.text_match_field_name_list)
        self.term_expr = f"PHRASE_MATCH({text_match_field_name}, '{key_word_1} {key_word_2}', {slop})"

    @trace()
    def phrase_match(self):
        try:
            res = self.milvus_client.query(collection_name=self.c_name, filter=self.term_expr, limit=5, timeout=query_timeout)
            return res, True
        except Exception as e:
            log.info(f"phrase_match error: {e}")
            return str(e), False

    @exception_handler()
    def run_task(self):
        key_word_1 = self.word_freq.most_common(2)[0][0]
        key_word_2 = self.word_freq.most_common(2)[1][0]
        slop=5
        text_match_field_name = random.choice(self.text_match_field_name_list)
        self.term_expr = f"PHRASE_MATCH({text_match_field_name}, '{key_word_1} {key_word_2}', {slop})"
        res, result = self.phrase_match()
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP / 10)


class JsonQueryChecker(Checker):
    """check json query operations in a dependent thread"""

    def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("JsonQueryChecker_")
        super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
        index_params = create_index_params_from_dict(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM)
        self.milvus_client.create_index(collection_name=self.c_name, index_params=index_params)
        self.milvus_client.load_collection(collection_name=self.c_name, replica_number=replica_number)  # do load before query
        self.insert_data()
        self.term_expr = self.get_term_expr()

    def get_term_expr(self):
        json_field_name = random.choice(self.json_field_names)
        fake = Faker()
        address_list = [fake.address() for _ in range(10)]
        name_list = [fake.name() for _ in range(10)]
        number_list = [random.randint(0, 100) for _ in range(10)]
        path = random.choice([ "name", "count"])
        path_value = {
            "address": address_list, # TODO not used in json query because of issue
            "name": name_list,
            "count": number_list
        }
        return f"{json_field_name}['{path}'] <= '{path_value[path][random.randint(0, len(path_value[path]) - 1)]}'"
        

    @trace()
    def json_query(self):
        try:
            res = self.milvus_client.query(collection_name=self.c_name, filter=self.term_expr, limit=5, timeout=query_timeout)
            return res, True
        except Exception as e:
            log.info(f"json_query error: {e}")
            return str(e), False

    @exception_handler()
    def run_task(self):
        self.term_expr = self.get_term_expr()
        res, result = self.json_query()
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP / 10)

class GeoQueryChecker(Checker):
    """check geometry query operations in a dependent thread"""

    def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("GeoQueryChecker_")
        super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
        index_params = create_index_params_from_dict(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM)
        self.milvus_client.create_index(collection_name=self.c_name, index_params=index_params)
        self.milvus_client.load_collection(collection_name=self.c_name, replica_number=replica_number)  # do load before query
        self.insert_data()
        self.term_expr = self.get_term_expr()

    def get_term_expr(self):
        geometry_field_name = random.choice(self.geometry_field_names)
        query_polygon = "POLYGON ((-180 -90, 180 -90, 180 90, -180 90, -180 -90))"
        return f"ST_WITHIN({geometry_field_name}, '{query_polygon}')"
        

    @trace()
    def geo_query(self):
        try:
            res = self.milvus_client.query(collection_name=self.c_name, filter=self.term_expr, limit=5, timeout=query_timeout)
            return res, True
        except Exception as e:
            log.info(f"geo_query error: {e}")
            return str(e), False

    @exception_handler()
    def run_task(self):
        self.term_expr = self.get_term_expr()
        res, result = self.geo_query()
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP / 10)


class DeleteChecker(Checker):
    """check delete operations in a dependent thread"""

    def __init__(self, collection_name=None, schema=None, shards_num=2):
        if collection_name is None:
            collection_name = cf.gen_unique_str("DeleteChecker_")
        super().__init__(collection_name=collection_name, schema=schema, shards_num=shards_num)
        index_params = create_index_params_from_dict(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM)
        self.milvus_client.create_index(collection_name=self.c_name, index_params=index_params)
        self.milvus_client.load_collection(collection_name=self.c_name)  # load before query
        self.insert_data()
        query_expr = f'{self.int64_field_name} > 0'
        res = self.milvus_client.query(collection_name=self.c_name, filter=query_expr, output_fields=[self.int64_field_name], partition_name=self.p_name)
        self.ids = [r[self.int64_field_name] for r in res]
        self.query_expr = query_expr
        delete_ids = self.ids[:len(self.ids) // 2]  # delete half of ids
        self.delete_expr = f'{self.int64_field_name} in {delete_ids}'

    def update_delete_expr(self):
        res = self.milvus_client.query(collection_name=self.c_name, filter=self.query_expr, output_fields=[self.int64_field_name], partition_name=self.p_name)
        all_ids = [r[self.int64_field_name] for r in res]
        if len(all_ids) < 100:
            # insert data to make sure there are enough ids to delete
            self.insert_data(nb=10000)
            res = self.milvus_client.query(collection_name=self.c_name, filter=self.query_expr, output_fields=[self.int64_field_name], partition_name=self.p_name)
            all_ids = [r[self.int64_field_name] for r in res]
        delete_ids = all_ids[:3000]  # delete 3000 ids
        self.delete_expr = f'{self.int64_field_name} in {delete_ids}'

    @trace()
    def delete_entities(self):
        try:
            res = self.milvus_client.delete(collection_name=self.c_name, filter=self.delete_expr, timeout=timeout, partition_name=self.p_name)
            return res, True
        except Exception as e:
            log.info(f"delete_entities error: {e}")
            return str(e), False

    @exception_handler()
    def run_task(self):
        self.update_delete_expr()
        res, result = self.delete_entities()
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP)


class DeleteFreshnessChecker(Checker):
    """check delete freshness operations in a dependent thread"""

    def __init__(self, collection_name=None, schema=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("DeleteChecker_")
        super().__init__(collection_name=collection_name, schema=schema)
        index_params = create_index_params_from_dict(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM)
        self.milvus_client.create_index(collection_name=self.c_name, index_params=index_params)
        self.milvus_client.load_collection(collection_name=self.c_name)  # load before query
        self.insert_data()
        query_expr = f'{self.int64_field_name} > 0'
        res = self.milvus_client.query(collection_name=self.c_name, filter=query_expr, output_fields=[self.int64_field_name], partition_name=self.p_name)
        self.ids = [r[self.int64_field_name] for r in res]
        self.query_expr = query_expr
        delete_ids = self.ids[:len(self.ids) // 2]  # delete half of ids
        self.delete_expr = f'{self.int64_field_name} in {delete_ids}'

    def update_delete_expr(self):
        res = self.milvus_client.query(collection_name=self.c_name, filter=self.query_expr, output_fields=[self.int64_field_name], partition_name=self.p_name)
        all_ids = [r[self.int64_field_name] for r in res]
        if len(all_ids) < 100:
            # insert data to make sure there are enough ids to delete
            self.insert_data(nb=10000)
            res = self.milvus_client.query(collection_name=self.c_name, filter=self.query_expr, output_fields=[self.int64_field_name], partition_name=self.p_name)
            all_ids = [r[self.int64_field_name] for r in res]
        delete_ids = all_ids[:len(all_ids) // 2]  # delete half of ids
        self.delete_expr = f'{self.int64_field_name} in {delete_ids}'

    def delete_entities(self):
        try:
            res = self.milvus_client.delete(collection_name=self.c_name, filter=self.delete_expr, timeout=timeout, partition_name=self.p_name)
            return res, True
        except Exception as e:
            log.info(f"delete_entities error: {e}")
            return str(e), False

    @trace()
    def delete_freshness(self):
        try:
            while True:
                res = self.milvus_client.query(collection_name=self.c_name, filter=self.delete_expr, output_fields=[f'{self.int64_field_name}'], timeout=timeout)
                if len(res) == 0:
                    break
            return res, True
        except Exception as e:
            log.info(f"delete_freshness error: {e}")
            return str(e), False

    @exception_handler()
    def run_task(self):
        self.update_delete_expr()
        res, result = self.delete_entities()
        res, result = self.delete_freshness()

        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP)


class CompactChecker(Checker):
    """check compact operations in a dependent thread"""

    def __init__(self, collection_name=None, schema=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("CompactChecker_")
        super().__init__(collection_name=collection_name, schema=schema)
        index_params = create_index_params_from_dict(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM)
        self.milvus_client.create_index(collection_name=self.c_name, index_params=index_params)
        self.milvus_client.load_collection(collection_name=self.c_name)  # load before compact

    @trace()
    def compact(self):
        from pymilvus import Collection
        collection = Collection(name=self.c_name, using=self.alias)
        res = collection.compact(timeout=timeout)
        collection.wait_for_compaction_completed()
        collection.get_compaction_plans()
        return res, True

    @exception_handler()
    def run_task(self):
        res, result = self.compact()
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP / 10)


class LoadBalanceChecker(Checker):
    """check load balance operations in a dependent thread"""

    def __init__(self, collection_name=None, schema=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("LoadBalanceChecker_")
        super().__init__(collection_name=collection_name, schema=schema)
        index_params = create_index_params_from_dict(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM)
        self.milvus_client.create_index(collection_name=self.c_name, index_params=index_params)
        self.milvus_client.load_collection(collection_name=self.c_name)
        self.sealed_segment_ids = None
        self.dst_node_ids = None
        self.src_node_id = None

    @trace()
    def load_balance(self):
        from pymilvus import utility
        res = utility.load_balance(collection_name=self.c_name, src_node_id=self.src_node_id, dst_node_ids=self.dst_node_ids, sealed_segment_ids=self.sealed_segment_ids, using=self.alias)
        return res, True

    def prepare(self):
        """prepare load balance params"""
        from pymilvus import Collection
        collection = Collection(name=self.c_name, using=self.alias)
        res = collection.get_replicas()
        # find a group which has multi nodes
        group_nodes = []
        for g in res.groups:
            if len(g.group_nodes) >= 2:
                group_nodes = list(g.group_nodes)
                break
        self.src_node_id = group_nodes[0]
        self.dst_node_ids = group_nodes[1:]
        from pymilvus import utility
        res = utility.get_query_segment_info(self.c_name, using=self.alias)
        segment_distribution = cf.get_segment_distribution(res)
        self.sealed_segment_ids = segment_distribution[self.src_node_id]["sealed"]

    @exception_handler()
    def run_task(self):
        self.prepare()
        res, result = self.load_balance()
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP / 10)


class BulkInsertChecker(Checker):
    """check bulk insert operations in a dependent thread"""

    def __init__(self, collection_name=None, files=[], use_one_collection=False, dim=ct.default_dim,
                 schema=None, insert_data=False, minio_endpoint=None, bucket_name=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("BulkInsertChecker_")
        super().__init__(collection_name=collection_name, dim=dim, schema=schema, insert_data=insert_data)
        self.schema = cf.gen_bulk_insert_collection_schema() if schema is None else schema
        self.files = files
        self.recheck_failed_task = False
        self.failed_tasks = []
        self.failed_tasks_id = []
        self.use_one_collection = use_one_collection  # if True, all tasks will use one collection to bulk insert
        self.c_name = collection_name
        self.minio_endpoint = minio_endpoint
        self.bucket_name = bucket_name

    def prepare(self, data_size=100000):
        with RemoteBulkWriter(
                schema=self.schema,
                file_type=BulkFileType.NUMPY,
                remote_path="bulk_data",
                connect_param=RemoteBulkWriter.ConnectParam(
                    endpoint=self.minio_endpoint,
                    access_key="minioadmin",
                    secret_key="minioadmin",
                    bucket_name=self.bucket_name
                )
        ) as remote_writer:

            for _ in range(data_size):
                row = cf.gen_row_data_by_schema(nb=1, schema=self.schema)[0]
                remote_writer.append_row(row)
            remote_writer.commit()
            batch_files = remote_writer.batch_files
            log.info(f"batch files: {batch_files}")
            self.files = batch_files[0]

    def update(self, files=None, schema=None):
        if files is not None:
            self.files = files
        if schema is not None:
            self.schema = schema

    def get_bulk_insert_task_state(self):
        from pymilvus import utility
        state_map = {}
        for task_id in self.failed_tasks_id:
            state = utility.get_bulk_insert_state(task_id=task_id, using=self.alias)
            state_map[task_id] = state
        return state_map

    @trace()
    def bulk_insert(self):
        log.info(f"bulk insert collection name: {self.c_name}")
        from pymilvus import utility
        task_ids = utility.do_bulk_insert(collection_name=self.c_name, files=self.files, using=self.alias)
        log.info(f"task ids {task_ids}")
        completed = utility.wait_for_bulk_insert_tasks_completed(task_ids=[task_ids], timeout=720, using=self.alias)
        return task_ids, completed

    @exception_handler()
    def run_task(self):
        if not self.use_one_collection:
            if self.recheck_failed_task and self.failed_tasks:
                self.c_name = self.failed_tasks.pop(0)
                log.debug(f"check failed task: {self.c_name}")
            else:
                self.c_name = cf.gen_unique_str("BulkInsertChecker_")
        self.milvus_client.create_collection(collection_name=self.c_name, schema=self.schema)
        log.info(f"collection schema: {self.milvus_client.describe_collection(self.c_name)}")
        # bulk insert data
        num_entities = self.milvus_client.get_collection_stats(collection_name=self.c_name).get("row_count", 0)
        log.info(f"before bulk insert, collection {self.c_name} has num entities {num_entities}")
        task_ids, completed = self.bulk_insert()
        num_entities = self.milvus_client.get_collection_stats(collection_name=self.c_name).get("row_count", 0)
        log.info(f"after bulk insert, collection {self.c_name} has num entities {num_entities}")
        if not completed:
            self.failed_tasks.append(self.c_name)
            self.failed_tasks_id.append(task_ids)
        return task_ids, completed

    def keep_running(self):
        self.prepare()
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP / 10)


class AlterCollectionChecker(Checker):
    def __init__(self, collection_name=None, schema=None):
        if collection_name is None:
            collection_name = cf.gen_unique_str("AlterCollectionChecker")
        super().__init__(collection_name=collection_name, schema=schema, enable_dynamic_field=False)
        self.milvus_client.release_collection(collection_name=self.c_name)
        res = self.milvus_client.describe_collection(collection_name=self.c_name)
        log.info(f"before alter collection {self.c_name} schema: {res}")
        # alter collection attributes
        self.milvus_client.alter_collection_properties(collection_name=self.c_name,
                                                       properties={"mmap.enabled": True})
        self.milvus_client.alter_collection_properties(collection_name=self.c_name,
                                                       properties={"collection.ttl.seconds": 3600})
        self.milvus_client.alter_collection_properties(collection_name=self.c_name, 
                                                       properties={"dynamicfield.enabled": True})
        res = self.milvus_client.describe_collection(collection_name=self.c_name)
        log.info(f"after alter collection {self.c_name} schema: {res}")
        
    @trace()
    def alter_check(self):
        try:
            res = self.milvus_client.describe_collection(collection_name=self.c_name)
            properties = res.get("properties", {})
            if properties.get("mmap.enabled") != "True":
                return res, False
            if properties.get("collection.ttl.seconds") != "3600":
                return res, False
            if res["enable_dynamic_field"] != True:
                return res, False
            return res, True
        except Exception as e:
            return str(e), False

    @exception_handler()
    def run_task(self):
        res, result = self.alter_check()
        return res, result

    def keep_running(self):
        while self._keep_running:
            self.run_task()
            sleep(constants.WAIT_PER_OP)


class TestResultAnalyzer(unittest.TestCase):
    def test_get_stage_success_rate(self):
        ra = ResultAnalyzer()
        res = ra.get_stage_success_rate()
        print(res)


if __name__ == '__main__':
    unittest.main()
